#!/bin/bash

source ../cluster_IP.conf

IP=

usage="usage: $0 -i/--IP <IP>...\n"
usage=$usage"\t-i, --IP: kafka broker's IP\n"

usage_exit() {
    printf "$usage"
    exit
}

if [ $# -ne 2 ]; then
    usage_exit
fi

while [ $# -ge 1 ]; do
case $1 in
    -i | --IP)
        [ -z "$2" ] && usage_exit
        IP=$2
        shift
        ;;
    *)
        usage_exit
esac
shift
done

echo "kafka IP: $IP"

zookeeper_ids=`docker ps -aq --filter name="zookeeper"`
if [ 0"$zookeeper_ids" != "0" ]; then
    docker stop $(docker ps -aq --filter name="zookeeper")
    docker rm $(docker ps -aq --filter name="zookeeper")
fi

kafka_ids=`docker ps -aq --filter name="kafka"`
if [ 0"$kafka_ids" != "0" ]; then
    docker stop $(docker ps -aq --filter name="kafka")
    docker rm $(docker ps -aq --filter name="kafka")
fi

docker run -d --name zookeeper --publish 2181:2181 \
    --volume /etc/localtime:/etc/localtime wurstmeister/zookeeper
docker run -d --name kafka --publish $KAFKA_PORT:$KAFKA_PORT --link zookeeper \
    --env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 \
    --env KAFKA_ADVERTISED_HOST_NAME=${IP} \
    --env KAFKA_ADVERTISED_PORT=$KAFKA_PORT \
    --volume /etc/localtime:/etc/localtime wurstmeister/kafka

sleep 10s

kafka_version=`docker exec kafka /bin/bash -c "ls /opt | grep kafka_"`


# create a topic: sql, saving sql hash pair which will put fabric
docker exec kafka /opt/${kafka_version}/bin/kafka-topics.sh --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 1 --topic $KAFKA_TOPIC
# list topics
docker exec kafka /opt/${kafka_version}/bin/kafka-topics.sh --list --zookeeper zookeeper:2181

# set data retain time
# docker exec kafka /opt/${kafka_version}/bin/kafka-configs.sh --bootstrap-server localhost:$KAFKA_PORT \
# --alter --entity-default --entity-type brokers --add-config 'log.retention.ms=3600000'
# docker exec kafka /opt/${kafka_version}/bin/kafka-configs.sh --zookeeper zookeeper:2181 \
# --alter --entity-name mytopic --entity-type topics --add-config retention.ms=3600000

# cat topic setting
# docker exec kafka /opt/${kafka_version}/bin/kafka-configs.sh --bootstrap-server localhost:$KAFKA_PORT --describe --entity-default --entity-type brokers
# docker exec kafka /opt/${kafka_version}/bin/kafka-configs.sh --zookeeper zookeeper:2181 --describe --entity-name mytopic --entity-type topics

# run a producer
# docker exec -it kafka /opt/${kafka_version}/bin/kafka-console-producer.sh --broker-list localhost:$KAFKA_PORT --topic $KAFKA_TOPIC

# run a consumer
# docker exec kafka /opt/${kafka_version}/bin/kafka-console-consumer.sh --bootstrap-server localhost:$KAFKA_PORT --topic $KAFKA_TOPIC